<?php

namespace app\push\controller;

use think\worker\Server;
use Workerman\Lib\Timer;
use Workerman\Worker as WorkermanWorker;

// 心跳间隔55秒
define('HEARTBEAT_TIME', 55);

class Worker extends Server
{
    protected $socket = 'websocket://0.0.0.0:2346';
    protected $processes = 1;
    private $uidConnections = [];
    /**
     * 收到信息
     * @param $connection
     * @param $data
     */
    public function onMessage($connection, $data)
    {
        // 给connection临时设置一个lastMessageTime属性，用来记录上次收到消息的时间
        $connection->lastMessageTime = time();
        // 判断当前客户端是否已经验证,既是否设置了uid
        if (!isset($connection->uid)) {
            // 没验证的话把第一个包当做uid（这里为了方便演示，没做真正的验证）
            $connection->uid = $data;
            /* 保存uid到connection的映射，这样可以方便的通过uid查找connection，
         * 实现针对特定uid推送数据
         */
            $this->uidConnections[$connection->uid] = $connection;
            return;
        }
    }

    /**
     * 当连接建立时触发的回调函数
     * @param $connection
     */
    public function onConnect($connection) {}

    /**
     * 当连接断开时触发的回调函数
     * @param $connection
     */
    public function onClose($connection)
    {
        if (isset($connection->uid)) {
            // 连接断开时删除映射
            unset($this->uidConnections[$connection->uid]);
        }
    }

    /**
     * 当客户端的连接上发生错误时触发
     * @param $connection
     * @param $code
     * @param $msg
     */
    public function onError($connection, $code, $msg)
    {
        echo "error $code $msg\n";
    }

    /**
     * 每个进程启动
     * @param $worker
     */
    public function onWorkerStart($worker)
    {
        Timer::add(10, function () use ($worker) {
            $time_now = time();
            foreach ($worker->connections as $connection) {
                // 有可能该connection还没收到过消息，则lastMessageTime设置为当前时间
                if (empty($connection->lastMessageTime)) {
                    $connection->lastMessageTime = $time_now;
                    continue;
                }
                // 上次通讯时间间隔大于心跳间隔，则认为客户端已经下线，关闭连接
                if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
                    $connection->close();
                }
            }
        });
        // 开启一个内部端口，方便内部系统推送数据，Text协议格式 文本+换行符
        $inner_text_worker = new WorkermanWorker('Text://0.0.0.0:5678');
        $inner_text_worker->onMessage = function ($connection, $buffer) {
            // $data数组格式，里面有uid，表示向那个uid的页面推送数据
            $data = json_decode($buffer, true);
            $uid = $data['uid'];
            // 通过workerman，向uid的页面推送数据
            $ret = $this->sendMessageByUid($uid, json_encode($data));
            // 返回推送结果
            $connection->send($ret ? 'ok' : "uid $uid not online");
        };
        $inner_text_worker->listen();
    }

    // 向所有验证的用户推送数据
    public function broadcast($message)
    {
        foreach ($this->uidConnections as $connection) {
            $connection->send($message);
        }
    }
    // 针对uid推送数据
    public function sendMessageByUid($uid, $message)
    {
        if (isset($this->uidConnections[$uid])) {
            $connection = $this->uidConnections[$uid];
            $connection->send($message);
            return true;
        }
        return false;
    }
}
